# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Internal utilities for gRPC Python."""

import collections
import threading
import time

import six

import grpc
from grpc import _common
from grpc.framework.foundation import callable_util

_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
    'Exception calling connectivity future "done" callback!')


class RpcMethodHandler(
    collections.namedtuple(
        '_RpcMethodHandler',
        ('request_streaming', 'response_streaming', 'request_deserializer',
         'response_serializer', 'unary_unary', 'unary_stream', 'stream_unary',
         'stream_stream',)),
    grpc.RpcMethodHandler):
  pass


class DictionaryGenericHandler(grpc.GenericRpcHandler):

  def __init__(self, service, method_handlers):
    self._method_handlers = {
        _common.fully_qualified_method(service, method): method_handler
        for method, method_handler in six.iteritems(method_handlers)}

  def service(self, handler_call_details):
    return self._method_handlers.get(handler_call_details.method)


class _ChannelReadyFuture(grpc.Future):

  def __init__(self, channel):
    self._condition = threading.Condition()
    self._channel = channel

    self._matured = False
    self._cancelled = False
    self._done_callbacks = []

  def _block(self, timeout):
    until = None if timeout is None else time.time() + timeout
    with self._condition:
      while True:
        if self._cancelled:
          raise grpc.FutureCancelledError()
        elif self._matured:
          return
        else:
          if until is None:
            self._condition.wait()
          else:
            remaining = until - time.time()
            if remaining < 0:
              raise grpc.FutureTimeoutError()
            else:
              self._condition.wait(timeout=remaining)

  def _update(self, connectivity):
    with self._condition:
      if (not self._cancelled and
          connectivity is grpc.ChannelConnectivity.READY):
        self._matured = True
        self._channel.unsubscribe(self._update)
        self._condition.notify_all()
        done_callbacks = tuple(self._done_callbacks)
        self._done_callbacks = None
      else:
        return

    for done_callback in done_callbacks:
      callable_util.call_logging_exceptions(
          done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)

  def cancel(self):
    with self._condition:
      if not self._matured:
        self._cancelled = True
        self._channel.unsubscribe(self._update)
        self._condition.notify_all()
        done_callbacks = tuple(self._done_callbacks)
        self._done_callbacks = None
      else:
        return False

    for done_callback in done_callbacks:
      callable_util.call_logging_exceptions(
          done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)

  def cancelled(self):
    with self._condition:
      return self._cancelled

  def running(self):
    with self._condition:
      return not self._cancelled and not self._matured

  def done(self):
    with self._condition:
      return self._cancelled or self._matured

  def result(self, timeout=None):
    self._block(timeout)
    return None

  def exception(self, timeout=None):
    self._block(timeout)
    return None

  def traceback(self, timeout=None):
    self._block(timeout)
    return None

  def add_done_callback(self, fn):
    with self._condition:
      if not self._cancelled and not self._matured:
        self._done_callbacks.append(fn)
        return

    fn(self)

  def start(self):
    with self._condition:
      self._channel.subscribe(self._update, try_to_connect=True)

  def __del__(self):
    with self._condition:
      if not self._cancelled and not self._matured:
        self._channel.unsubscribe(self._update)


def channel_ready_future(channel):
  ready_future = _ChannelReadyFuture(channel)
  ready_future.start()
  return ready_future
